package com.jie.flink.cdc.flinksink;

import com.jie.flink.cdc.flinksink.config.ElasticsearchConfigProperties;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.connector.elasticsearch.sink.Elasticsearch7SinkBuilder;
import org.apache.flink.connector.elasticsearch.sink.ElasticsearchSink;
import org.apache.http.HttpHost;
import org.bouncycastle.util.Arrays;

/**
 * @author zhanggj
 * @date 2023/5/29 10:21
 * @desc
 */
public class EsFlinkSinkBuilder implements FlinkSinkBuilder<ElasticsearchConfigProperties> {
    private EsFlinkSinkBuilder() {
    }
    private static class InstanceHolder{
        private static EsFlinkSinkBuilder INSTANCE = new EsFlinkSinkBuilder();
    }
    public static EsFlinkSinkBuilder getInstance() {
        return InstanceHolder.INSTANCE;
    }
    @Override
    public Sink<String> buildSink(final ElasticsearchConfigProperties elasticsearchConfigProperties) {
        if (Arrays.isNullOrEmpty(elasticsearchConfigProperties.getHosts())) {
            return null;
        }
        HttpHost[] httpHosts = new HttpHost[elasticsearchConfigProperties.getHosts().length];
        for (int i = 0; i < elasticsearchConfigProperties.getHosts().length; i++) {
            httpHosts[i] = HttpHost.create(elasticsearchConfigProperties.getHosts()[i]);
        }
        ElasticsearchSink<String> elasticsearchSink = new Elasticsearch7SinkBuilder()
                .setHosts(httpHosts)
                .setConnectionUsername(elasticsearchConfigProperties.getUserName())
                .setConnectionPassword(elasticsearchConfigProperties.getPassword())
                .setEmitter(new ElasticsearchEmitterImpl())
                .build();
        return elasticsearchSink;
    }
}
